package com.gao.flying.mq.connection;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;

import com.gao.flying.mq.util.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SingleConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, ExceptionListener, InitializingBean, DisposableBean {

    protected final Logger logger = LoggerFactory.getLogger(getClass());

    private ConnectionFactory targetConnectionFactory;

    private String clientId;

    private ExceptionListener exceptionListener;

    private boolean reconnectOnException = false;

    /** The target Connection */
    private Connection connection;

    /** A hint whether to create a queue or topic connection */
    private Boolean pubSubMode;

    /** An internal aggregator allowing for per-connection ExceptionListeners */
    private AggregatedExceptionListener aggregatedExceptionListener;

    /** Whether the shared Connection has been started */
    private int startedCount = 0;

    /** Synchronization monitor for the shared Connection */
    private final Object connectionMonitor = new Object();

    /**
     * Create a new SingleConnectionFactory for bean-style usage.
     * 
     * @see #setTargetConnectionFactory
     */
    public SingleConnectionFactory() {
    }

    /**
     * Create a new SingleConnectionFactory that always returns the given Connection.
     * 
     * @param targetConnection
     *            the single Connection
     */
    public SingleConnectionFactory(Connection targetConnection) {
        Assert.notNull(targetConnection, "Target Connection must not be null");
        this.connection = targetConnection;
    }

    /**
     * Create a new SingleConnectionFactory that always returns a single Connection that it will lazily create via the
     * given target ConnectionFactory.
     * 
     * @param targetConnectionFactory
     *            the target ConnectionFactory
     */
    public SingleConnectionFactory(ConnectionFactory targetConnectionFactory) {
        Assert.notNull(targetConnectionFactory, "Target ConnectionFactory must not be null");
        this.targetConnectionFactory = targetConnectionFactory;
    }

    /**
     * Set the target ConnectionFactory which will be used to lazily create a single Connection.
     */
    public void setTargetConnectionFactory(ConnectionFactory targetConnectionFactory) {
        this.targetConnectionFactory = targetConnectionFactory;
    }

    /**
     * Return the target ConnectionFactory which will be used to lazily create a single Connection, if any.
     */
    public ConnectionFactory getTargetConnectionFactory() {
        return this.targetConnectionFactory;
    }

    /**
     * Specify a JMS client ID for the single Connection created and exposed by this ConnectionFactory.
     * <p>
     * Note that client IDs need to be unique among all active Connections of the underlying JMS provider. Furthermore,
     * a client ID can only be assigned if the original ConnectionFactory hasn't already assigned one.
     * 
     * @see Connection#setClientID
     * @see #setTargetConnectionFactory
     */
    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    /**
     * Return a JMS client ID for the single Connection created and exposed by this ConnectionFactory, if any.
     */
    protected String getClientId() {
        return this.clientId;
    }

    /**
     * Specify an JMS ExceptionListener implementation that should be registered with the single Connection created by
     * this factory.
     * 
     * @see #setReconnectOnException
     */
    public void setExceptionListener(ExceptionListener exceptionListener) {
        this.exceptionListener = exceptionListener;
    }

    /**
     * Return the JMS ExceptionListener implementation that should be registered with the single Connection created by
     * this factory, if any.
     */
    protected ExceptionListener getExceptionListener() {
        return this.exceptionListener;
    }

    /**
     * Specify whether the single Connection should be reset (to be subsequently renewed) when a JMSException is
     * reported by the underlying Connection.
     * <p>
     * Default is "false". Switch this to "true" to automatically trigger recovery based on your JMS provider's
     * exception notifications.
     * <p>
     * Internally, this will lead to a special JMS ExceptionListener (this SingleConnectionFactory itself) being
     * registered with the underlying Connection. This can also be combined with a user-specified ExceptionListener, if
     * desired.
     * 
     * @see #setExceptionListener
     */
    public void setReconnectOnException(boolean reconnectOnException) {
        this.reconnectOnException = reconnectOnException;
    }

    /**
     * Return whether the single Connection should be renewed when a JMSException is reported by the underlying
     * Connection.
     */
    protected boolean isReconnectOnException() {
        return this.reconnectOnException;
    }

    /**
     * Make sure a Connection or ConnectionFactory has been set.
     */

    public void afterPropertiesSet() {
        if (this.connection == null && getTargetConnectionFactory() == null) {
            throw new IllegalArgumentException("Target Connection or ConnectionFactory is required");
        }
    }

    public Connection createConnection() throws JMSException {
        return getSharedConnectionProxy(getConnection());
    }

    public Connection createConnection(String username, String password) throws JMSException {
        throw new javax.jms.IllegalStateException("SingleConnectionFactory does not support custom username and password");
    }

    public QueueConnection createQueueConnection() throws JMSException {
        Connection con;
        synchronized (this.connectionMonitor) {
            this.pubSubMode = Boolean.FALSE;
            con = createConnection();
        }
        if (!(con instanceof QueueConnection)) {
            throw new javax.jms.IllegalStateException("This SingleConnectionFactory does not hold a QueueConnection but rather: " + con);
        }
        return ((QueueConnection) con);
    }

    public QueueConnection createQueueConnection(String username, String password) throws JMSException {
        throw new javax.jms.IllegalStateException("SingleConnectionFactory does not support custom username and password");
    }

    public TopicConnection createTopicConnection() throws JMSException {
        Connection con;
        synchronized (this.connectionMonitor) {
            this.pubSubMode = Boolean.TRUE;
            con = createConnection();
        }
        if (!(con instanceof TopicConnection)) {
            throw new javax.jms.IllegalStateException("This SingleConnectionFactory does not hold a TopicConnection but rather: " + con);
        }
        return ((TopicConnection) con);
    }

    public TopicConnection createTopicConnection(String username, String password) throws JMSException {
        throw new javax.jms.IllegalStateException("SingleConnectionFactory does not support custom username and password");
    }

    /**
     * Obtain an initialized shared Connection.
     * 
     * @return the Connection (never {@code null})
     * @throws JMSException
     *             if thrown by JMS API methods
     * @see #initConnection()
     */
    protected Connection getConnection() throws JMSException {
        synchronized (this.connectionMonitor) {
            if (this.connection == null) {
                initConnection();
            }
            return this.connection;
        }
    }

    /**
     * Initialize the underlying shared Connection.
     * <p>
     * Closes and reinitializes the Connection if an underlying Connection is present already.
     * 
     * @throws JMSException
     *             if thrown by JMS API methods
     * @see #prepareConnection
     */
    public void initConnection() throws JMSException {
        if (getTargetConnectionFactory() == null) {
            throw new IllegalStateException("'targetConnectionFactory' is required for lazily initializing a Connection");
        }
        synchronized (this.connectionMonitor) {
            if (this.connection != null) {
                closeConnection(this.connection);
            }
            this.connection = doCreateConnection();
            prepareConnection(this.connection);
            if (this.startedCount > 0) {
                this.connection.start();
            }
            if (logger.isInfoEnabled()) {
                logger.info("Established shared JMS Connection: " + this.connection);
            }
        }
    }

    /**
     * Exception listener callback that renews the underlying single Connection.
     * 
     * @see #resetConnection()
     */

    public void onException(JMSException ex) {
        logger.warn("Encountered a JMSException - resetting the underlying JMS Connection", ex);
        resetConnection();
    }

    /**
     * Close the underlying shared connection. The provider of this ConnectionFactory needs to care for proper shutdown.
     * <p>
     * As this bean implements DisposableBean, a bean factory will automatically invoke this on destruction of its
     * cached singletons.
     * 
     * @see #resetConnection()
     */

    public void destroy() {
        resetConnection();
    }

    /**
     * Reset the underlying shared Connection, to be reinitialized on next access.
     * 
     * @see #closeConnection
     */
    public void resetConnection() {
        synchronized (this.connectionMonitor) {
            if (this.connection != null) {
                closeConnection(this.connection);
            }
            this.connection = null;
        }
    }

    /**
     * Create a JMS Connection via this template's ConnectionFactory.
     * 
     * @return the new JMS Connection
     * @throws JMSException
     *             if thrown by JMS API methods
     */
    protected Connection doCreateConnection() throws JMSException {
        ConnectionFactory cf = getTargetConnectionFactory();
        if (Boolean.FALSE.equals(this.pubSubMode) && cf instanceof QueueConnectionFactory) {
            return ((QueueConnectionFactory) cf).createQueueConnection();
        } else if (Boolean.TRUE.equals(this.pubSubMode) && cf instanceof TopicConnectionFactory) {
            return ((TopicConnectionFactory) cf).createTopicConnection();
        } else {
            return getTargetConnectionFactory().createConnection();
        }
    }

    /**
     * Prepare the given Connection before it is exposed.
     * <p>
     * The default implementation applies ExceptionListener and client id. Can be overridden in subclasses.
     * 
     * @param con
     *            the Connection to prepare
     * @throws JMSException
     *             if thrown by JMS API methods
     * @see #setExceptionListener
     * @see #setReconnectOnException
     */
    protected void prepareConnection(Connection con) throws JMSException {
        if (getClientId() != null) {
            con.setClientID(getClientId());
        }
        if (this.aggregatedExceptionListener != null) {
            con.setExceptionListener(this.aggregatedExceptionListener);
        } else if (getExceptionListener() != null || isReconnectOnException()) {
            ExceptionListener listenerToUse = getExceptionListener();
            if (isReconnectOnException()) {
                this.aggregatedExceptionListener = new AggregatedExceptionListener();
                this.aggregatedExceptionListener.delegates.add(this);
                if (listenerToUse != null) {
                    this.aggregatedExceptionListener.delegates.add(listenerToUse);
                }
                listenerToUse = this.aggregatedExceptionListener;
            }
            con.setExceptionListener(listenerToUse);
        }
    }

    /**
     * Template method for obtaining a (potentially cached) Session.
     * <p>
     * The default implementation always returns {@code null}. Subclasses may override this for exposing specific
     * Session handles, possibly delegating to {@link #createSession} for the creation of raw Session objects that will
     * then get wrapped and returned from here.
     * 
     * @param con
     *            the JMS Connection to operate on
     * @param mode
     *            the Session acknowledgement mode ({@code Session.TRANSACTED} or one of the common modes)
     * @return the Session to use, or {@code null} to indicate creation of a raw standard Session
     * @throws JMSException
     *             if thrown by the JMS API
     */
    protected Session getSession(Connection con, Integer mode) throws JMSException {
        return null;
    }

    /**
     * Create a default Session for this ConnectionFactory, adapting to JMS 1.0.2 style queue/topic mode if necessary.
     * 
     * @param con
     *            the JMS Connection to operate on
     * @param mode
     *            the Session acknowledgement mode ({@code Session.TRANSACTED} or one of the common modes)
     * @return the newly created Session
     * @throws JMSException
     *             if thrown by the JMS API
     */
    protected Session createSession(Connection con, Integer mode) throws JMSException {
        // Determine JMS API arguments...
        boolean transacted = (mode == Session.SESSION_TRANSACTED);
        int ackMode = (transacted ? Session.AUTO_ACKNOWLEDGE : mode);
        // Now actually call the appropriate JMS factory method...
        if (Boolean.FALSE.equals(this.pubSubMode) && con instanceof QueueConnection) {
            return ((QueueConnection) con).createQueueSession(transacted, ackMode);
        } else if (Boolean.TRUE.equals(this.pubSubMode) && con instanceof TopicConnection) {
            return ((TopicConnection) con).createTopicSession(transacted, ackMode);
        } else {
            return con.createSession(transacted, ackMode);
        }
    }

    /**
     * Close the given Connection.
     * 
     * @param con
     *            the Connection to close
     */
    protected void closeConnection(Connection con) {
        if (logger.isDebugEnabled()) {
            logger.debug("Closing shared JMS Connection: " + con);
        }
        try {
            try {
                if (this.startedCount > 0) {
                    con.stop();
                }
            } finally {
                con.close();
            }
        } catch (javax.jms.IllegalStateException ex) {
            logger.debug("Ignoring Connection state exception - assuming already closed: " + ex);
        } catch (Throwable ex) {
            logger.debug("Could not close shared JMS Connection", ex);
        }
    }

    /**
     * Wrap the given Connection with a proxy that delegates every method call to it but suppresses close calls. This is
     * useful for allowing application code to handle a special framework Connection just like an ordinary Connection
     * from a JMS ConnectionFactory.
     * 
     * @param target
     *            the original Connection to wrap
     * @return the wrapped Connection
     */
    protected Connection getSharedConnectionProxy(Connection target) {
        List<Class<?>> classes = new ArrayList<Class<?>>(3);
        classes.add(Connection.class);
        if (target instanceof QueueConnection) {
            classes.add(QueueConnection.class);
        }
        if (target instanceof TopicConnection) {
            classes.add(TopicConnection.class);
        }
        return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), classes.toArray(new Class<?>[classes.size()]), new SharedConnectionInvocationHandler());
    }

    /**
     * Invocation handler for a cached JMS Connection proxy.
     */
    private class SharedConnectionInvocationHandler implements InvocationHandler {

        private ExceptionListener localExceptionListener;

        private boolean locallyStarted = false;

        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if (method.getName().equals("equals")) {
                Object other = args[0];
                if (proxy == other) {
                    return true;
                }
                if (other == null || !Proxy.isProxyClass(other.getClass())) {
                    return false;
                }
                InvocationHandler otherHandler = Proxy.getInvocationHandler(other);
                return (otherHandler instanceof SharedConnectionInvocationHandler && factory() == ((SharedConnectionInvocationHandler) otherHandler).factory());
            } else if (method.getName().equals("hashCode")) {
                // Use hashCode of containing SingleConnectionFactory.
                return System.identityHashCode(factory());
            } else if (method.getName().equals("toString")) {
                return "Shared JMS Connection: " + getConnection();
            } else if (method.getName().equals("setClientID")) {
                // Handle setClientID method: throw exception if not compatible.
                String currentClientId = getConnection().getClientID();
                if (currentClientId != null && currentClientId.equals(args[0])) {
                    return null;
                } else {
                    throw new javax.jms.IllegalStateException("setClientID call not supported on proxy for shared Connection. " + "Set the 'clientId' property on the SingleConnectionFactory instead.");
                }
            } else if (method.getName().equals("setExceptionListener")) {
                // Handle setExceptionListener method: add to the chain.
                synchronized (connectionMonitor) {
                    if (aggregatedExceptionListener != null) {
                        ExceptionListener listener = (ExceptionListener) args[0];
                        if (listener != this.localExceptionListener) {
                            if (this.localExceptionListener != null) {
                                aggregatedExceptionListener.delegates.remove(this.localExceptionListener);
                            }
                            if (listener != null) {
                                aggregatedExceptionListener.delegates.add(listener);
                            }
                            this.localExceptionListener = listener;
                        }
                        return null;
                    } else {
                        throw new javax.jms.IllegalStateException("setExceptionListener call not supported on proxy for shared Connection. "
                                + "Set the 'exceptionListener' property on the SingleConnectionFactory instead. "
                                + "Alternatively, activate SingleConnectionFactory's 'reconnectOnException' feature, "
                                + "which will allow for registering further ExceptionListeners to the recovery chain.");
                    }
                }
            } else if (method.getName().equals("getExceptionListener")) {
                synchronized (connectionMonitor) {
                    if (this.localExceptionListener != null) {
                        return this.localExceptionListener;
                    } else {
                        return getExceptionListener();
                    }
                }
            } else if (method.getName().equals("start")) {
                localStart();
                return null;
            } else if (method.getName().equals("stop")) {
                localStop();
                return null;
            } else if (method.getName().equals("close")) {
                localStop();
                synchronized (connectionMonitor) {
                    if (this.localExceptionListener != null) {
                        if (aggregatedExceptionListener != null) {
                            aggregatedExceptionListener.delegates.remove(this.localExceptionListener);
                        }
                        this.localExceptionListener = null;
                    }
                }
                return null;
            } else if (method.getName().equals("createSession") || method.getName().equals("createQueueSession") || method.getName().equals("createTopicSession")) {
                // Default: JMS 2.0 createSession() method
                Integer mode = Session.AUTO_ACKNOWLEDGE;
                if (args != null) {
                    if (args.length == 1) {
                        // JMS 2.0 createSession(int) method
                        mode = (Integer) args[0];
                    } else if (args.length == 2) {
                        // JMS 1.1 createSession(boolean, int) method
                        boolean transacted = (Boolean) args[0];
                        Integer ackMode = (Integer) args[1];
                        mode = (transacted ? Session.SESSION_TRANSACTED : ackMode);
                    }
                }
                Session session = getSession(getConnection(), mode);
                if (session != null) {
                    if (!method.getReturnType().isInstance(session)) {
                        String msg = "JMS Session does not implement specific domain: " + session;
                        try {
                            session.close();
                        } catch (Throwable ex) {
                            logger.trace("Failed to close newly obtained JMS Session", ex);
                        }
                        throw new javax.jms.IllegalStateException(msg);
                    }
                    return session;
                }
            }
            try {
                return method.invoke(getConnection(), args);
            } catch (InvocationTargetException ex) {
                throw ex.getTargetException();
            }
        }

        private void localStart() throws JMSException {
            synchronized (connectionMonitor) {
                if (!this.locallyStarted) {
                    this.locallyStarted = true;
                    if (startedCount == 0 && connection != null) {
                        connection.start();
                    }
                    startedCount++;
                }
            }
        }

        private void localStop() throws JMSException {
            synchronized (connectionMonitor) {
                if (this.locallyStarted) {
                    this.locallyStarted = false;
                    if (startedCount == 1 && connection != null) {
                        connection.stop();
                    }
                    if (startedCount > 0) {
                        startedCount--;
                    }
                }
            }
        }

        private SingleConnectionFactory factory() {
            return SingleConnectionFactory.this;
        }
    }

    /**
     * Internal aggregated ExceptionListener for handling the internal recovery listener in combination with
     * user-specified listeners.
     */
    private class AggregatedExceptionListener implements ExceptionListener {

        final Set<ExceptionListener> delegates = new LinkedHashSet<ExceptionListener>(2);

        public void onException(JMSException ex) {
            synchronized (connectionMonitor) {
                for (ExceptionListener listener : this.delegates) {
                    listener.onException(ex);
                }
            }
        }
    }

}
